home *** CD-ROM | disk | FTP | other *** search
- #!/usr/bin/python
-
- # (c) 2008 Canonical Ltd.
- #
- # This program is free software; you can redistribute it and/or modify
- # it under the terms of the GNU General Public License as published by
- # the Free Software Foundation; either version 2 of the License, or
- # (at your option) any later version.
- #
- # This program is distributed in the hope that it will be useful,
- # but WITHOUT ANY WARRANTY; without even the implied warranty of
- # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- # GNU General Public License for more details.
- #
- # You should have received a copy of the GNU General Public License along
- # with this program; if not, write to the Free Software Foundation, Inc.,
- # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
-
- import sys
- import gobject
- import dbus
- import dbus.service
- import dbus.mainloop.glib
- import os
- import subprocess
- import apt_pkg
- import struct
- import fcntl
-
- from UbuntuSystemService.utils import *
-
- class UnknownProxyTypeError(dbus.DBusException):
- " a unknown proxy type was passed "
- pass
- class InvalidKeyboardTypeError(dbus.DBusException):
- " a invalid keyboard was set "
- pass
- class PermissionDeniedError(dbus.DBusException):
- " permission denied by policy "
- pass
-
-
- class ServiceBackend(dbus.service.Object):
- """
- the main backend class that supports various system settings like
- proxy and keyboard
- """
-
- # some class properties
- DBUS_INTERFACE_NAME = "com.ubuntu.SystemService"
- SUPPORTED_PROXIES = ("http","ftp", "https", "socks")
-
- # default files
- CONSOLE_SETUP_DEFAULT = "/etc/default/console-setup"
- DPKG_LOCK = "/var/lib/dpkg/lock"
-
- def __init__(self):
- bus_name = dbus.service.BusName(self.DBUS_INTERFACE_NAME,
- bus=dbus.SystemBus())
- dbus.service.Object.__init__(self, bus_name, '/')
- apt_pkg.InitConfig()
-
- def _authWithPolicyKit(self, sender, connection, priv):
- #print "_authWithPolicyKit()"
- system_bus = dbus.SystemBus()
- obj = system_bus.get_object("org.freedesktop.PolicyKit1",
- "/org/freedesktop/PolicyKit1/Authority",
- "org.freedesktop.PolicyKit1.Authority")
- policykit = dbus.Interface(obj, "org.freedesktop.PolicyKit1.Authority")
- info = dbus.Interface(connection.get_object('org.freedesktop.DBus',
- '/org/freedesktop/DBus/Bus',
- False),
- 'org.freedesktop.DBus')
- pid = info.GetConnectionUnixProcessID(sender)
- #print "pid is:",pid
- #print "priv: ", priv
- subject = ('unix-process',
- { 'pid' : dbus.UInt32(pid, variant_level=1) }
- )
- details = { '' : '' }
- flags = dbus.UInt32(1) # AllowUserInteraction = 0x00000001
- cancel_id = ''
- (ok, notused, details) = policykit.CheckAuthorization(subject,
- priv,
- details,
- flags,
- cancel_id)
-
- #print "ok: ", ok
- return ok
-
- # proxy stuff ---------------------------------------------------
- def _etc_environment_proxy(self, proxy_type):
- " internal that returns the /etc/environment proxy "
- if not os.path.exists("/etc/environment"):
- return ""
- for line in open("/etc/environment"):
- if line.startswith("%s_proxy=" % proxy_type):
- (key, value) = line.strip().split("=")
- value = value.strip('"')
- return value
- return ""
-
- def _http_proxy(self):
- " internal helper that returns the current http proxy "
- apt_proxy = self._apt_proxy("http")
- env_proxy = self._etc_environment_proxy("http")
- # FIXME: what to do if both proxies are differnet?
- return env_proxy
-
- def _apt_proxy(self, proxy_type):
- " internal helper that returns the configured apt proxy"
- apt_pkg.InitConfig()
- proxy = apt_pkg.Config.Find("Acquire::%s::proxy" % proxy_type)
- return proxy
-
- def _ftp_proxy(self):
- apt_proxy = self._apt_proxy("ftp")
- env_proxy = self._etc_environment_proxy("ftp")
- # FIXME: what to do if both proxies are differnet?
- return env_proxy
-
- def _socks_proxy(self):
- env_proxy = self._etc_environment_proxy("socks")
- return env_proxy
-
- def _ftp_apt_proxy(self):
- " internal helper that returns the configured apt proxy"
- apt_pkg.InitConfig()
- http_proxy = apt_pkg.Config.Find("Acquire::ftp::proxy")
- return http_proxy
-
- def _https_proxy(self):
- " internal helper that returns the current https proxy "
- env_proxy = self._etc_environment_proxy("https")
- return env_proxy
-
- def _verify_proxy(self, proxy_type, proxy):
- " internal helper, verify that the proxy string is valid "
- return verify_proxy(proxy_type, proxy)
-
- def _verify_no_proxy(self, proxy):
- " internal helper, verify that the no_proxy string is valid "
- return verify_no_proxy(proxy)
-
- @dbus.service.method(DBUS_INTERFACE_NAME,
- in_signature='s',
- out_signature='s',
- sender_keyword='sender',
- connection_keyword='conn')
- def get_proxy(self, proxy_type, sender=None, conn=None):
- """
- Get the current system-wide proxy for type "proxy_type"
-
- This function will look in the apt configuration to
- find the current http proxy.
- """
- if proxy_type == "http":
- return self._http_proxy()
- if proxy_type == "https":
- return self._https_proxy()
- elif proxy_type == "ftp":
- return self._ftp_proxy()
- elif proxy_type == "socks":
- return self._socks_proxy()
- raise UnknownProxyTypeError, "proxy_type '%s' is unknown in get_proxy" % proxy_type
-
-
- def _write_apt_proxy(self, proxy_type, new_proxy):
- " helper that writes the new apt proxy "
- confdir = apt_pkg.Config.FindDir("Dir::Etc")
- if not self._verify_proxy(proxy_type, new_proxy):
- return False
- # check for the easy case (no proxy setting in the config)
- old_proxy = self._apt_proxy(proxy_type)
- if old_proxy == "":
- f=open(os.path.join(confdir, "apt.conf"),"a")
- f.write("Acquire::%s::proxy \"%s\";\n" % (proxy_type, new_proxy))
- f.close()
- return True
- # now the difficult case (search the apt configuration files)
- # build the list of apt configuration files first
- apt_conffiles = [os.path.join(confdir,"apt.conf.d",n) for n in
- os.listdir(os.path.join(confdir,"apt.conf.d"))]
- apt_conffiles.insert(0, os.path.join(confdir,"apt.conf"))
- # then scan them for the content
- for f in apt_conffiles:
- new_content = []
- found = False
- for line in open(f):
- if line.lower().startswith("acquire::%s::proxy" % proxy_type):
- found = True
- line = "Acquire::%s::proxy \"%s\";\n" % (proxy_type, new_proxy)
- # FIXME: scan for more complicated forms of the proxy
- # settings and/or scan for the proxy string and just
- # replace this
- new_content.append(line)
- # if we found/replaced the proxy, write it out now
- if found:
- open(f,"w").write("".join(new_content))
- return True
- return False
-
- def _write_etc_environment_proxy(self, proxy_type, new_proxy):
- if not self._verify_proxy(proxy_type, new_proxy):
- return False
- found=False
- new_content=[]
- new_proxy_line = '%s_proxy="%s"\n' % (proxy_type, new_proxy)
- for line in open("/etc/environment"):
- if line.startswith("%s_proxy=" % proxy_type):
- line=new_proxy_line
- found = True
- new_content.append(line)
- if found:
- open("/etc/environment","w").write("".join(new_content))
- else:
- open("/etc/environment","a").write(new_proxy_line)
- return True
-
- def _clear_etc_environment_proxy(self, proxy_type):
- found=False
- new_content=[]
- for line in open("/etc/environment"):
- if line.startswith("%s_proxy=" % proxy_type):
- found = True
- else:
- new_content.append(line)
- if found:
- open("/etc/environment","w").write("".join(new_content))
- return True
-
- def _clear_apt_proxy(self, proxy_type):
- " helper that clears the apt proxy "
- confdir = apt_pkg.Config.FindDir("Dir::Etc")
- apt_conffiles = [os.path.join(confdir,"apt.conf.d",n) for n in
- os.listdir(os.path.join(confdir,"apt.conf.d"))]
- apt_conffiles.insert(0, os.path.join(confdir,"apt.conf"))
- for f in apt_conffiles:
- new_content = []
- found = False
- for line in open(f):
- if line.lower().startswith("acquire::%s::proxy" % proxy_type):
- found = True
- else:
- new_content.append(line)
- # if we found/replaced the proxy, write it out now
- if found:
- open(f,"w").write("".join(new_content))
- return True
-
- @dbus.service.method(DBUS_INTERFACE_NAME,
- in_signature='ss',
- out_signature='b',
- sender_keyword='sender',
- connection_keyword='conn')
- def set_proxy(self, proxy_type, new_proxy, sender=None, conn=None):
- """
- Set a new system-wide proxy that looks like e.g.:
- http://proxy.host.net:port/
-
- This function will set a new apt configuration and
- modify /etc/environment
-
- """
- if not self._authWithPolicyKit(sender, conn,
- "com.ubuntu.systemservice.setproxy"):
- if not self._authWithPolicyKit(sender, conn,
- "org.gnome.gconf.defaults.set-system"):
- raise PermissionDeniedError, "Permission denied by policy"
-
- # check if something supported is set
- if not proxy_type in self.SUPPORTED_PROXIES:
- raise UnknownProxyTypeError, "proxy_type '%s' is unknown in set_proxy" % proxy_type
-
- # set (or reset)
- if new_proxy == "" or new_proxy is None:
- res = self._clear_apt_proxy(proxy_type)
- res &= self._clear_etc_environment_proxy(proxy_type)
- else:
- res = self._write_apt_proxy(proxy_type, new_proxy)
- res &= self._write_etc_environment_proxy(proxy_type, new_proxy)
- return res
-
-
- def _clear_etc_environment_no_proxy(self):
- found=False
- new_content=[]
- for line in open("/etc/environment"):
- if line.startswith("no_proxy="):
- found = True
- else:
- new_content.append(line)
- if found:
- open("/etc/environment","w").write("".join(new_content))
- return True
-
- def _write_etc_environment_no_proxy(self, new_proxy):
- if not self._verify_no_proxy(new_proxy):
- return False
- found=False
- new_content=[]
- new_proxy_line = 'no_proxy="%s"\n' % new_proxy
- for line in open("/etc/environment"):
- if line.startswith("no_proxy="):
- line=new_proxy_line
- found = True
- new_content.append(line)
- if found:
- open("/etc/environment","w").write("".join(new_content))
- else:
- open("/etc/environment","a").write(new_proxy_line)
- return True
-
- @dbus.service.method(DBUS_INTERFACE_NAME,
- in_signature='s',
- out_signature='b',
- sender_keyword='sender',
- connection_keyword='conn')
- def set_no_proxy(self, new_no_proxy, sender=None, conn=None):
- """
- Set a new system-wide no_proxy list that looks like e.g.:
- localhost,foo.com
-
- This function will modify /etc/environment
-
- """
- if not self._authWithPolicyKit(sender, conn,
- "com.ubuntu.systemservice.setnoproxy"):
- if not self._authWithPolicyKit(sender, conn,
- "org.gnome.gconf.defaults.set-system"):
- raise PermissionDeniedError, "Permission denied by policy"
-
- # set (or reset)
- if new_no_proxy == "" or new_no_proxy is None:
- res = self._clear_no_proxy()
- else:
- res = self._write_etc_environment_no_proxy(new_no_proxy)
- return res
-
- # keyboard stuff ---------------------------------------------------
- def _get_keyboard_from_etc(self):
- """
- helper that reads /etc/default/console-setup and gets the
- keyboard settings there
- """
- model = ""
- layout = ""
- variant = ""
- options = ""
- for line in open(self.CONSOLE_SETUP_DEFAULT):
- if line.startswith("XKBMODEL="):
- model = line.split("=")[1].strip('"\n')
- elif line.startswith("XKBLAYOUT="):
- layout = line.split("=")[1].strip('"\n')
- elif line.startswith("XKBVARIANT="):
- variant = line.split("=")[1].strip('"\n')
- elif line.startswith("XKBOPTIONS="):
- options = line.split("=")[1].strip('"\n')
- return (model, layout, variant, options)
-
- @dbus.service.method(DBUS_INTERFACE_NAME,
- in_signature='',
- out_signature='ssss',
- sender_keyword='sender',
- connection_keyword='conn')
- def get_keyboard(self, sender=None, conn=None):
- """
- Set the system default keyboard configuration.
-
- It expects four input arguments (strings):
- model -- the model (evdev, pc105, ...)
- layout -- the layout (de, us, ...)
- variant -- the variant (nodeadkeys, ..)
- options -- keyboard options (nocaps, ...)
-
- It returns True on sucess
- """
- (model, layout, variant, options) = self._get_keyboard_from_etc()
- return (model, layout, variant, options)
-
- def _set_keyboard_to_etc(self, model, layout, variant, options):
- """
- helper that writes /etc/default/console-setup
- """
- #print "set_keyboard_to_etc"
- # FIXME: what to do if not os.path.exists(self.CONSOLE_SETUP_DEFAULT)
- content = []
- for line in open(self.CONSOLE_SETUP_DEFAULT):
- if line.startswith("XKBMODEL="):
- line = 'XKBMODEL="%s"\n' % model
- elif line.startswith("XKBLAYOUT="):
- line = 'XKBLAYOUT="%s"\n' % layout
- elif line.startswith("XKBVARIANT="):
- line = 'XKBVARIANT="%s"\n' % variant
- elif line.startswith("XKBOPTIONS="):
- line = 'XKBOPTIONS="%s"\n' % options
- content.append(line)
- # if something changed, write
- if content != open(self.CONSOLE_SETUP_DEFAULT).readlines():
- #print "content changed, writing"
- open(self.CONSOLE_SETUP_DEFAULT+".new","w").write("".join(content))
- os.rename(self.CONSOLE_SETUP_DEFAULT+".new",
- self.CONSOLE_SETUP_DEFAULT)
- return True
-
- def _verify_keyboard_settings(self, model, layout, variant, options):
- " helper that verfies the settings "
- # check against char whitelist
- allowed = "^[0-9a-zA-Z:,_]*$"
- for s in (model, layout, variant, options):
- if not re.match(allowed, s):
- #print "illegal chars in '%s'" % s
- return False
- # check if 'ckbcomp' can compile it
- cmd = ["ckbcomp"]
- if model:
- cmd += ["-model",model]
- if layout:
- cmd += ["-layout", layout]
- if variant:
- cmd += ["-variant", variant]
- if options:
- cmd += ["-option", options]
- ret = subprocess.call(cmd, stdout=open(os.devnull))
- return (ret == 0)
-
- def _run_setupcon(self):
- """
- helper that runs setupcon to activate the settings, taken from
- oem-config (/usr/lib/oem-config/console/console-setup-apply)
- """
- ret = subprocess.call(["setupcon","--save-only"])
- subprocess.Popen(["/usr/sbin/update-initramfs","-u"])
- return (ret == 0)
-
- @dbus.service.method(DBUS_INTERFACE_NAME,
- in_signature='ssss',
- out_signature='b',
- sender_keyword='sender',
- connection_keyword='conn')
- def set_keyboard(self, model, layout, variant, options, sender=None, conn=None):
- """
- Get the current keyboard configuration. This returns four
- strings: (model, layout, variant, options)
- """
- #print "set_keyboard: ", model, layout, variant, options
- if not self._authWithPolicyKit(sender, conn,
- "com.ubuntu.systemservice.setkeyboard"):
- if not self._authWithPolicyKit(sender, conn,
- "org.gnome.gconf.defaults.set-system"):
-
- raise PermissionDeniedError, "Permission denied by policy"
-
- # if no keyboard model is set, try to guess one
- # this is based on the "console-setup.config" code that
- # defaults to pc105
- if not model:
- model = "pc105"
- if layout == "us":
- model = "pc104"
- elif layout == "br":
- model = "abnt2"
- elif layout == "jp":
- model = "jp106"
-
- # verify the settings
- if not self._verify_keyboard_settings(model, layout, variant, options):
- #print "verify_keyboard failed"
- raise InvalidKeyboardTypeError, "Invalid keyboard set"
-
- # apply
- if not self._set_keyboard_to_etc(model, layout, variant, options):
- #print "could not write keyboard to /etc"
- return False
- if not self._run_setupcon():
- #print "setupcon failed"
- return False
- return True
-
- @dbus.service.method(DBUS_INTERFACE_NAME,
- in_signature='',
- out_signature='b',
- sender_keyword='sender',
- connection_keyword='conn')
- def is_package_system_locked(self, sender=None, conn=None):
- """
- Check if the package system is locked
- """
- #print "set_keyboard: ", model, layout, variant, options
- if not self._authWithPolicyKit(sender, conn,
- "com.ubuntu.systemservice.ispkgsystemlocked"):
- raise PermissionDeniedError, "Permission denied by policy"
- # check for file
- if not os.path.exists(self.DPKG_LOCK):
- return True
- # check for lock
- flk=struct.pack('hhllhl',fcntl.F_WRLCK,0,0,0,0,0)
- f=open(self.DPKG_LOCK)
- rv = fcntl.fcntl(f, fcntl.F_GETLK, flk)
- lockv = struct.unpack('hhllhl', rv)[0]
- f.close()
- return (lockv == fcntl.F_WRLCK)
-
- if __name__ == "__main__":
- dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
- server = ServiceBackend()
- gobject.MainLoop().run()
-
-